<?php
/**
 * redis循环监控队列
 * @author zhiyong.luo
 * @date 2017-08-08
 */
header("Content-type: text/html; charset=utf-8");
ini_set('display_errors', 1);
set_time_limit(0);
include_once dirname(dirname(__FILE__)) . '/mgtv_init.php';
include_once dirname(dirname(dirname(__FILE__))) . '/nn_logic/message/nl_message.class.php';
include_once dirname(dirname(dirname(dirname(__FILE__)))) . '/np/np_redis_check.class.php';

class check_delayed_message extends nn_timer
{
	//反馈条数
	public $check_num = 5000;
	
	public function action($params = null)
	{
		$this->msg('开始执行...');
		$this->do_timer_action();
		$this->msg('执行结束...');
	}
	/**
	 * 脚本执行体
	 * @author zhiyong.luo
	 */
	private function do_timer_action()
	{
		$dc = nl_get_dc(array (
				"db_policy" => NL_DB_WRITE | NL_DB_READ,
				"cache_policy" => NP_KV_CACHE_TYPE_MEMCACHE
		));
		$dc->open();
				
		$result_cp = nl_cp::query_all($dc);
		if ($result_cp['ret'] != 0 || empty($result_cp['data_info']))
		{
			return false;
		}
		
		foreach ($result_cp['data_info'] as $cp_info)
		{
			$cp_config = json_decode($cp_info['nns_config'], true);
			if(!isset($cp_config['delayed_message_import_enabled']) || $cp_config['delayed_message_import_enabled'] != '1')
			{
			    continue;
			}
			$delayed_message_import_overtime = (int)$cp_config['delayed_message_import_overtime'] > 0 ? $cp_config['delayed_message_import_overtime'] : 86400;

			/**
			 * redis循环队列，操作参数
			 * $config = array("queue_name" => "cp_id","recover_mode" => "loop","command_lose_time" => "指令失效时间");
			 * array("guid","message_id","action","type","import_id")
			 */
			$config = array(
					"queue_name" => $cp_info['nns_id'],
					"command_lose_time" => $delayed_message_import_overtime
			);
			$redis_obj = nl_get_redis();
			$redis_check = new np_redis_check_class($redis_obj, $config);
			$result = $redis_check->get_command($this->check_num);//获取redis队列
			if(!is_array($result) || empty($result))
			{
				continue;
			}

			$back_result = array();
			foreach ($result as $key=>$value)
			{
				$command = json_decode($value['command'],true);
				if(!is_array($command) || empty($command))
				{
					continue;
				}
				//验证是否注入成功
				switch ($command['action'])
				{
					case 'asset_package': //媒资打包，查询分集
						$sql = "select nns_id from nns_vod_index where nns_import_id='{$command['import_id']}' and nns_deleted != 1";
                        $check_re = nl_query_by_db($sql, $dc->db());
						if(is_array($check_re)) //分集已经注入，检测是否注入EPG成功
                        {
                            $c2_sql = "select nns_id from nns_mgtvbk_c2_task where nns_type='index' and nns_ref_id='{$check_re[0]['nns_id']}' and nns_epg_status='99' limit 1";
                            $check_re = nl_query_by_db($c2_sql,$dc->db());
                        }
						break;
					default://上下线，优先验证主媒资是否存在，不存在在验证分集
						$vod_sql = "select nns_id from nns_vod where nns_asset_import_id='{$command['import_id']}' and nns_deleted != 1";
						$check_re = nl_query_by_db($vod_sql, $dc->db());
						if(!is_array($check_re))
						{
							$index_sql = "select nns_id from nns_vod_index where nns_import_id='{$command['import_id']}' and nns_deleted != 1";
							$check_re = nl_query_by_db($index_sql, $dc->db());
						}
						break;
				}
				if(is_array($check_re))//已经可以再次进行注入了
				{
//					$where = array(
//							'nns_id' => $command['id'],
//					);
//					$update_fields = array(
//							'nns_message_state' => 0,
//							'nns_again' => 0,
//							'nns_delete' => 0,
//							'nns_modify_time' => date('Y-m-d H:i:s'),
//					);
                    $update_fields = array(
							'nns_message_state' => 0,
							'nns_delete' => 0,
							'nns_modify_time' => date('Y-m-d H:i:s'),
					);
                    //把消息队列置为初始状态，重试次数+1
                    $update_re = nl_message::update_message_state($dc, $command['id'], $update_fields,true);
//					$update_re = nl_message::update_message($dc, $where, $update_fields);
					if($update_re["ret"] == '0')//成功
					{
                        continue;
                    }
				}

                $command['execute_time'] = date('Y-m-d H:i:s',time());
                $back_result[] = array(
                    'time' => $value['time'],
                    'command' => json_encode($command),
                );
			}
			if(!empty($back_result))
			{
				$redis_check->recover_command($back_result);//回收队列
			}
		}
	}
}
$check_delayed_message = new check_delayed_message("check_delayed_message", 'public',__FILE__);
$check_delayed_message->run();